In [ ]:
import numpy as np
import pandas as pd
import mmlspark
Next, import the CSV dataset.
In [ ]:
# load raw data from small-sized 30 MB CSV file (trimmed to contain just what we use)
dataFile = "On_Time_Performance_2012_9.csv"
import os, urllib
if not os.path.isfile(dataFile):
urllib.request.urlretrieve("https://mmlspark.azureedge.net/datasets/"+dataFile, dataFile)
flightDelay = spark.createDataFrame(
pd.read_csv(dataFile, dtype={"Month": np.float64, "Quarter": np.float64,
"DayofMonth": np.float64, "DayOfWeek": np.float64,
"OriginAirportID": np.float64, "DestAirportID": np.float64,
"CRSDepTime": np.float64, "CRSArrTime": np.float64}))
# Print information on the dataset we loaded
print("records read: " + str(flightDelay.count()))
print("Schema:")
flightDelay.printSchema()
flightDelay.limit(10).toPandas()
Split the dataset into train and test sets.
In [ ]:
train,test = flightDelay.randomSplit([0.75, 0.25])
Train a regressor on dataset with l-bfgs.
In [ ]:
from mmlspark import TrainRegressor, TrainedRegressorModel
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import StringIndexer
# Convert columns to categorical
catCols = ["Carrier", "DepTimeBlk", "ArrTimeBlk"]
trainCat = train
testCat = test
for catCol in catCols:
simodel = StringIndexer(inputCol=catCol, outputCol=catCol + "Tmp").fit(train)
trainCat = simodel.transform(trainCat).drop(catCol).withColumnRenamed(catCol + "Tmp", catCol)
testCat = simodel.transform(testCat).drop(catCol).withColumnRenamed(catCol + "Tmp", catCol)
lr = LinearRegression().setSolver("l-bfgs").setRegParam(0.1).setElasticNetParam(0.3)
model = TrainRegressor(model=lr, labelCol="ArrDelay").fit(trainCat)
model.write().overwrite().save("flightDelayModel.mml")
Score the regressor on the test data.
In [ ]:
flightDelayModel = TrainedRegressorModel.load("flightDelayModel.mml")
scoredData = flightDelayModel.transform(testCat)
scoredData.limit(10).toPandas()
Compute model metrics against the entire scored dataset
In [ ]:
from mmlspark import ComputeModelStatistics
metrics = ComputeModelStatistics().transform(scoredData)
metrics.toPandas()
Finally, compute and show per-instance statistics, demonstrating the usage
of ComputePerInstanceStatistics.
In [ ]:
from mmlspark import ComputePerInstanceStatistics
evalPerInstance = ComputePerInstanceStatistics().transform(scoredData)
evalPerInstance.select("ArrDelay", "Scores", "L1_loss", "L2_loss").limit(10).toPandas()